Kubernetes 源码笔记(kube-controller-endpoint-controller)

endpoint 是一组 Pod 的集合的抽象,是 Kubernetes 里内置的资源类型,可以通过 Service 的 label 进行筛选得到。endpoint 往往伴随 service 诞生,对于定义 ExternalName 的 Service 或 clusterIP 直接定义为 None 的 Service(Headless Service) 来说,endpoint 也可以是不存在的。 endpoint 可以用来筛选出同 service 关联的所有 pod,将流量导入到 service 后,经过负载均衡的处理发送到这些 pod 上。

也可以单独定义 Endpoint 和 Service. 如果我们需要将 Kubernetes 集群之外的外部服务定义为 Kubernetes 内部的一个服务,可以分别定义 Service 和 Endpoints,Service 不需要定义标签选择器,定义一个与 Service 同名的 Endpoints,就可以与之关联。Endpoints 中的 subsets 中可以定义需要连接的外部服务器的 IP 和端口。

endpoint-controller 的基本逻辑是通过 informer 获取 Service,再根据 labels selector 筛选出相应的 pod,接着创建或更新 endpoints 对象,endpoints 其实就是一组 pod 的集合,只不过其中包括了 Node 的信息,而它的 name 属性即为 service 的 name。

初始化

这部分代码在 cmd/kube-controller-manager/app/core.go 中,跟其他的 controller 类似,通过 goroutine 创建 Controller,调用 Run 方法

1
2
3
4
5
6
7
8
9
func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
go endpointcontroller.NewEndpointController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Endpoints(),
ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
return nil, true, nil
}

可以看到 endpoint 初始化时,会注册三个 informer, 分别是 podInformer, serviceInformer, endpointsInformer, 在 NewEndpointController 方法中,会对 service, pod 资源进行 watch 操作,注册对应的 add, update, delete 等操作,这里就不给出代码了。举例来说,如果监听到了 add pod 的操作,就调用 addPod 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (e *EndpointController) getPodServiceMemberships(pod *v1.Pod) (sets.String, error) {
set := sets.String{}
services, err := e.serviceLister.GetPodServices(pod)
if err != nil {
// don't log this error because this function makes pointless
// errors when no services match.
return set, nil
}
for i := range services {
key, err := controller.KeyFunc(services[i])
if err != nil {
return nil, err
}
set.Insert(key)
}
return set, nil
}

// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *v1.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
}
}

addPod 中,首先实例化一个 pod 对象,找到它属于哪一个 service, 然后将 service 的集合以 namespace/name 为 key 加入到队列中。

启动

Run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
// ...
if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
}

go func() {
defer utilruntime.HandleCrash()
e.checkLeftoverEndpoints()
}()

<-stopCh
}

通过 WaitForCacheSync 等待 pod,service,endpoint 的缓存同步完之后,根据配置启动相应数量(由 kube-controller-manager 启动参数中的 --concurrent-endpoint-syncs 决定)的 worker 进行处理,我们接着看 worker 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (e *EndpointController) worker() {
for e.processNextWorkItem() {
}
}

func (e *EndpointController) processNextWorkItem() bool {
eKey, quit := e.queue.Get()
if quit {
return false
}
defer e.queue.Done(eKey)

err := e.syncService(eKey.(string))
e.handleErr(err, eKey)

return true
}

很简单的逻辑,从队列中取出 key,根据这个 key 可以获取 namespace,service name, 调用 syncService 方法进行处理。基本上 syncService 就是 endpoint-controller 的主体逻辑。

主体逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
func (e *EndpointController) syncService(key string) error {
// ...
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
// service 已经删除了,删除对应的 endpoint
err = e.client.CoreV1().Endpoints(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}

if service.Spec.Selector == nil {
return nil
}

glog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
return err
}
// TolerateUnreadyEndpointsAnnotation ...

subsets := []v1.EndpointSubset{}
var totalReadyEps int = 0
var totalNotReadyEps int = 0

for _, pod := range pods {
// 如果 pod 的 ip 为空,继续循环
if len(pod.Status.PodIP) == 0 {
glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
// 如果 pod 正在被删除,继续循环
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
continue
}

epa := *podToEndpointAddress(pod)

hostname := pod.Spec.Hostname
if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace {
epa.Hostname = hostname
}

// headless service 可以没有端口
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
// No need to repack subsets for headless service without ports.
}
} else {
// 遍历 service 的端口
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]

portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}

var readyEps, notReadyEps int
epp := &v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto}
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
subsets = endpoints.RepackSubsets(subsets)

// 如果 endpoint 不存在,重新创建一个
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
return err
}
}

// currentEndpoints 的版本为空,表明需要创建 endpoint
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
// 如果 currentEndpoints 的 subset 和整理后的 subsets 相等,并且 label 与 service 的 label 一致,则忽略本次更新操作
if !createEndpoints &&
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
newEndpoints := currentEndpoints.DeepCopy()
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}

glog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
if createEndpoints {
// 如果没有同一个命名空间下没有同名的 endpoint, 则生成新的 endpoint
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(newEndpoints)
} else {
// 如果已经存在,就更新
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(newEndpoints)
}
if err != nil {
// ...
}
return nil
}

首先通过 namespace,name 获取 service,如果 service 不存在,说明 service 已经被删除了,删除对应的 endpoint,这里有个没能解决的 bug 是,如果 service 删除了,但是这时 endpoint 刚好挂了,那么对应的 endpoint 就没法删除。Run 方法中调用的 checkLeftoverEndpoints 方法在 endpoint-controller 的启动阶段会做一次清理工作,它会拉取所有的 endpoint 的资源对象,遍历一遍,如果发现没有对应的 service, 把 endpoint 的 name 当做 key 传入队列,这时进入循环后会发现这个 endpoint 不能获取对应的 service name, 就会被删除。

接着通过 namespace,Selector 获取对应的 Pod 的集合,遍历这些 Pod,一一构建 endpoint 的 Subset,其中包括 Pod 的 IP 地址,Hostname,所在的 Node name 等信息。遍历结束后,通过 Service 的 Namespace 和 Name 获取当前的 Endpoint,与构建的新的 endpoint 进行比较,决定创建还是更新 endpoint 对象,通过 client 进行相应的调用。对于没有 Selector 的 Service,不需要创建相应的 endpoint。

简单来说,使用 Kubernetes 的时候我们不需要刻意关注 endpoint 资源对象,只有创建 service 的时候使用了 selector, 就可以自动创建 endpoint. 如果单独创建一个不绑定 service 的 endpoint, 它会被 Kubernetes 自动清理的。

References